Module 6: Event-Driven Development Using Queues and Streams

Developing Serverless Solutions on AWS - Live Session Guide

Module Overview

This module covers how Lambda processes events from queues (SQS) and streams (Kinesis, DynamoDB Streams) using event source mappings (polling-based invocation).

Topics

1. Event Source Mappings (Polling Pattern)

Unlike async invocations (EventBridge/SNS push to Lambda), queues and streams require Lambda to poll for records.

How It Works

Producer (Your app/service) Queue or Stream (SQS / Kinesis / DDB) Lambda Service Polls for batches Lambda Fn (Your code) put poll invoke (sync) Event Source Mapping manages polling automatically

Key Points

2. Amazon SQS as Lambda Event Source

Queues provide a buffer between a producer and a single consumer. Messages are processed and deleted.

How Lambda Processes SQS

BehaviorDetail
PollingLambda starts with 5 polling processes, scales up based on queue depth
Batching1-10,000 messages per batch (configurable). Max 10 for Standard, 10,000 for FIFO.
Visibility TimeoutMessages become invisible while being processed. Set to 6x your Lambda timeout.
DeletionLambda deletes messages from queue after successful processing of entire batch
FailureFailed messages return to queue after visibility timeout. Use DLQ after N failures.
ConcurrencyOne Lambda instance per batch. Multiple batches = multiple concurrent instances.
Partial BatchReport individual failures with ReportBatchItemFailures (only retry failed items)

SQS Queue Types

Standard Queue

  • Nearly unlimited throughput
  • At-least-once delivery (possible duplicates)
  • Best-effort ordering
  • Lambda scales up quickly (up to 1000 concurrent)

FIFO Queue

  • Exactly-once processing
  • Strict ordering within message group
  • 300 msg/sec (3000 with batching/high throughput)
  • Lambda processes one batch per message group

SAM Template: SQS + Lambda

SQS Processing Lifecycle

SQS Queue M1 M2 M3 M4 M5 Visible Invisible (being processed) Lambda Service Polls every 1s Batch of records Your Function Process batch (sync invoke) SUCCESS Delete from queue FAILURE Return to queue MAX RETRIES Send to DLQ Visibility Timeout = 6x Lambda Timeout

SAM Template: SQS + Lambda

Resources:
  OrderQueue:
    Type: AWS::SQS::Queue
    Properties:
      VisibilityTimeout: 180   # 6x Lambda timeout (30s)
      RedrivePolicy:
        deadLetterTargetArn: !GetAtt OrderDLQ.Arn
        maxReceiveCount: 3

  OrderDLQ:
    Type: AWS::SQS::Queue

  ProcessOrderFn:
    Type: AWS::Serverless::Function
    Properties:
      Handler: app.handler
      Runtime: python3.12
      Timeout: 30
      Events:
        SQSEvent:
          Type: SQS
          Properties:
            Queue: !GetAtt OrderQueue.Arn
            BatchSize: 10
            FunctionResponseTypes:
              - ReportBatchItemFailures

3. Kinesis Data Streams & DynamoDB Streams

Streams buffer data arriving continuously and support multiple consumers. Data is retained (not deleted on read).

How Lambda Processes Streams

BehaviorDetail
Shard-basedLambda maintains one iterator per shard, processes sequentially within each shard
Batching1-10,000 records per batch. Configurable batch window (up to 5 min).
OrderingRecords within a shard are processed in order (partition key determines shard)
On failureDefault: block until batch succeeds or data expires. Configurable: skip, bisect, send to destination.
Concurrency1 concurrent invocation per shard (default). Parallelization factor: up to 10 per shard.
PointerLambda tracks position in stream. On success, advances pointer. On failure, retries from same position.

Kinesis vs DynamoDB Streams

FeatureKinesis Data StreamsDynamoDB Streams
Polling rate1x per second per shard4x per second per shard
Shard countYou set manually (or auto-scale)Automatic (driven by table capacity)
ScalingManual shard splitting/merging or auto-scalingShards scale with table - Lambda concurrency increases too
Retention24 hours - 365 days24 hours (fixed)
Use caseReal-time analytics, log aggregation, IoTReact to table changes (triggers)
ConsumersMultiple (enhanced fan-out available)Up to 2 consumers per table

Scaling Behavior Comparison

SQS Scaling Queue depth increases Lambda adds 60 instances/min Up to 1000 concurrent Auto-scales aggressively (no shard management needed) Kinesis Scaling Concurrency = shards x parallelization e.g., 10 shards x 2 = 20 concurrent Polls 1x/sec per shard Manual shard management (or Kinesis auto-scaling) DynamoDB Streams Shards scale with table capacity Polls 4x/sec per shard Lambda concurrency increases Automatic (table-driven) (can be unpredictable)

Kinesis Stream Processing - Shard-Based

Kinesis Data Stream Shard 1 R1 R2 R3 R4 ^ ptr Shard 2 R1 R2 R3 ^ ptr Shard 3 R1 R2 ^ ptr Lambda Instance 1 Processing Shard 1 Lambda Instance 2 Processing Shard 2 Lambda Instance 3 Processing Shard 3 Stream Processing Rules 1 shard = 1 concurrent Lambda (unless parallelization factor > 1) Pointer advances on success Blocks on failure (retries same batch) Strict ordering within each shard Parallelization factor: up to 10x Records NOT deleted (retained 24h-365d) ^ = iterator position (pointer)

Error Handling for Streams

# Event source mapping configuration for streams
EventSourceMapping:
  Type: AWS::Lambda::EventSourceMapping
  Properties:
    EventSourceArn: !GetAtt MyStream.Arn
    FunctionName: !Ref ProcessStreamFn
    StartingPosition: LATEST
    BatchSize: 100
    MaximumBatchingWindowInSeconds: 5
    ParallelizationFactor: 2          # 2 concurrent per shard
    MaximumRetryAttempts: 3           # Don't block forever
    BisectBatchOnFunctionError: true  # Split batch to find bad record
    MaximumRecordAgeInSeconds: 86400  # Skip records older than 1 day
    DestinationConfig:
      OnFailure:
        Destination: !GetAtt FailedRecordsSQS.Arn  # Send failures to DLQ

4. Queues vs Streams - When to Use Each

QUEUE (SQS) Producer Queue (messages deleted) Consumer 1 message = 1 consumer Message removed after processing STREAM (Kinesis) Producer Stream (records retained) 24h - 365 days Consumer A Consumer B Consumer C 1 record = MULTIPLE consumers Records stay in stream
CriteriaQueue (SQS)Stream (Kinesis/DDB)
DeliveryEach message to ONE consumerEach record to MULTIPLE consumers
After processingMessage deleted from queueRecord stays in stream (retained)
OrderingBest-effort (Standard) or strict (FIFO)Strict within shard (partition key)
ThroughputNearly unlimited (Standard)Depends on shard count
ScalingLambda auto-scales based on queue depthConcurrency = number of shards x parallelization factor
Best forIndividual task processing, work queues, decouplingReal-time analytics, multiple consumers, ordered processing

Use SQS When:

  • Processing individual messages (orders, tasks, jobs)
  • Need simple decoupling between services
  • One consumer per message is fine
  • Need auto-scaling without managing shards
  • FIFO needed with moderate throughput

Use Streams When:

  • Multiple consumers need the same data
  • Deriving analytics from aggregated records
  • Strict ordering is critical
  • Need to replay/reprocess data
  • High-volume continuous data (IoT, clickstream, logs)

5. Live Demo: SQS + Lambda with DLQ

Step 1: Create SQS queue with DLQ

# Create dead-letter queue
aws sqs create-queue --queue-name demo-orders-dlq

# Get DLQ ARN
DLQ_ARN=$(aws sqs get-queue-attributes --queue-url https://sqs.us-west-2.amazonaws.com/ACCOUNT/demo-orders-dlq \
  --attribute-names QueueArn --query 'Attributes.QueueArn' --output text)

# Create main queue with redrive policy
aws sqs create-queue --queue-name demo-orders \
  --attributes '{
    "VisibilityTimeout": "180",
    "RedrivePolicy": "{\"deadLetterTargetArn\":\"'$DLQ_ARN'\",\"maxReceiveCount\":\"3\"}"
  }'

Step 2: Create Lambda function

# handler.py
import json

def handler(event, context):
    failed_ids = []
    for record in event["Records"]:
        body = json.loads(record["body"])
        order_id = body.get("orderId", "unknown")
        print(f"Processing order: {order_id}")
        
        # Simulate failure for certain orders
        if body.get("fail"):
            failed_ids.append(record["messageId"])
            continue
        
        print(f"  Success: {order_id}")
    
    # Report partial batch failures
    return {
        "batchItemFailures": [
            {"itemIdentifier": mid} for mid in failed_ids
        ]
    }

Step 3: Create event source mapping

aws lambda create-event-source-mapping \
  --function-name demo-order-processor \
  --event-source-arn arn:aws:sqs:us-west-2:ACCOUNT:demo-orders \
  --batch-size 5 \
  --function-response-types ReportBatchItemFailures

Step 4: Send test messages

# Success message
aws sqs send-message --queue-url https://sqs.us-west-2.amazonaws.com/ACCOUNT/demo-orders \
  --message-body '{"orderId":"ORD-001","amount":99.99}'

# Failure message (will retry then go to DLQ)
aws sqs send-message --queue-url https://sqs.us-west-2.amazonaws.com/ACCOUNT/demo-orders \
  --message-body '{"orderId":"ORD-002","amount":50,"fail":true}'

Step 5: Check DLQ after failures

aws sqs receive-message --queue-url https://sqs.us-west-2.amazonaws.com/ACCOUNT/demo-orders-dlq

6. What's New (2024-2025 Updates)

7. Module Summary

Key Takeaways

  • Event source mappings = Lambda polls queues/streams for you
  • Lambda invokes your function synchronously with a batch
  • SQS: messages deleted after successful processing
  • Streams: pointer advances on success, retries on failure
  • Use ReportBatchItemFailures for SQS partial batch
  • Use BisectBatchOnFunctionError for streams
  • Set visibility timeout = 6x Lambda timeout (SQS)
  • Always configure a DLQ for poison messages

Best Practices

  • Use event filtering to reduce unnecessary invocations
  • Set maximum concurrency to protect downstream services
  • Use batch windows to aggregate small messages (cost savings)
  • Enable enhanced fan-out for Kinesis multi-consumer
  • Use parallelization factor to increase stream throughput
  • Monitor IteratorAge (streams) and ApproximateAgeOfOldestMessage (SQS)

Developing Serverless Solutions on AWS - Module 6 | Live Session Guide

Last updated: June 2026